SAMZA-2067: Support Samza's running on Kubernetes#1197
SAMZA-2067: Support Samza's running on Kubernetes#1197weiqingy wants to merge 10 commits intoapache:masterfrom
Conversation
xinyuiscool
left a comment
There was a problem hiding this comment.
Finish about 1/3 of it. Need to sync up to understand the design more.
| coordinatorSystemConfig = | ||
| new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); | ||
| new MapConfig(SamzaObjectMapper.getObjectMapper() | ||
| .readValue(coordinatorSystemEnv.replace("\\\"", "\""), Config.class)); |
There was a problem hiding this comment.
Is there any reason we do this replacement?
There was a problem hiding this comment.
The RB has been updated and aligned with the master branch. The change here has gone.
| */ | ||
| private final ContainerAllocator containerAllocator; | ||
| private final Thread allocatorThread; | ||
| private Thread allocatorThread = null; |
There was a problem hiding this comment.
please keep this final and assign to different values in the constructor.
There was a problem hiding this comment.
Sure. The RB was updated, and the change here has gone.
| this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); | ||
| LOG.info("Finished container process manager initialization."); | ||
| if (shouldStartAllocateThread()) { | ||
| this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); |
There was a problem hiding this comment.
Do actually use containerAllocator anymore? Please walk through this part of the code with me offline.
| // In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a | ||
| // separate thread to keep polling the allocated resources to start the container. | ||
| public boolean shouldStartAllocateThread() { | ||
| return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager"); |
There was a problem hiding this comment.
This lookup of specific cluster manager seems pretty hard to maintain. Can we think of a better to distinguish when we need this to do container allocation?
| LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}", | ||
| state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no"); | ||
|
|
||
| if (shouldStartAllocateThread()) { |
There was a problem hiding this comment.
This If is just for logging the aliveness info "Is allocator thread alive". The RB has been updated. The aliveness info is now in the LOG.debug() in line 207 (keep the code same as the current codebase).
| return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive(); | ||
|
|
||
| boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get(); | ||
| return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown; |
There was a problem hiding this comment.
I guess it's nicer to do a null check of the thread instead of doing shouldStartAllocateThread()
| import java.util.concurrent.atomic.AtomicReference | ||
|
|
||
| import org.apache.samza.{Partition, SamzaException} | ||
| import org.apache.samza.config._ |
There was a problem hiding this comment.
Seems all the changes in this file is just to reorg imports. If that's the case, I think this file can be left untouched.
There was a problem hiding this comment.
The RB has been updated and aligned with the master branch. The change here has gone.
There was a problem hiding this comment.
Three main things:
-
the integration with current ContainerAllocator is quite hard to understand. It's more like just try to fit in instead of organic part of it. I would suggest to think about whether the current ContainerAllocator can be split into two allocator, one sync and one async. The yarn can use the Async allocator, and kubernetes can use the sync one. The thread can be brought up for the async allocator.
-
error handling: there seems to be quite a few loose ends about failures, e.g. operatorPod watcher failure, create failure, delete failure. All these need to be tied up so it can provide production quality code.
-
unit tests are completely missing. I expect unit test for new classes and new methods as possible.
Minor:
- please add more comments and javadocs too.
|
|
||
| // the image name of samza | ||
| public static final String APP_IMAGE = "kube.app.image"; | ||
| public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; |
There was a problem hiding this comment.
Probably not put your personal id as the default config.
There was a problem hiding this comment.
right. Updated it to "samza/samza:v0".
There was a problem hiding this comment.
Will create samza docker hub if needed.
| public static final String DEFAULT_DIRECTORY = "/opt/samza/"; | ||
|
|
||
| // the memory and the cpu cores of container | ||
| public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb"; |
There was a problem hiding this comment.
Seems these configs should already be defined in the cluster manager config. If not, can we make sure it's shared?
There was a problem hiding this comment.
RB was updated to remove these duplicate config definitions.
| */ | ||
| public class KubeJob implements StreamJob { | ||
| private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); | ||
| private Config config; |
There was a problem hiding this comment.
mark all these vars final.
There was a problem hiding this comment.
Done in both KubeJob and KubeClusterResourceManager.
| this.kubernetesClient = KubeClientFactory.create(); | ||
| this.config = config; | ||
| this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, | ||
| config.get(APP_NAME, "samza"), config.get(APP_ID, "1")); |
There was a problem hiding this comment.
Please use ApplicaitonConfig.getAppName()/getAppId().
| /** | ||
| * Kubernetes related configurations | ||
| */ | ||
| public class KubeConfig { |
There was a problem hiding this comment.
I would suggest following the other Config class example to add getters for common configs, like namespace, jcPodName, etc. It's easier to centralize the place where these things are created. The rest of the code should just use these getters normally instead of directly accessing the config vars. It will be even better to make those static vars private.
There was a problem hiding this comment.
sounds good. will update this class.
| return cmdBuilder.buildCommand(); | ||
| } | ||
|
|
||
| private CommandBuilder getCommandBuilder(String containerId) { |
There was a problem hiding this comment.
this method should be shared with Yarn I believe.
| } | ||
|
|
||
| // Construct the envs for the task container pod | ||
| private List<EnvVar> getEnvs(CommandBuilder cmdBuilder) { |
There was a problem hiding this comment.
Can we make it an static util method?
| */ | ||
|
|
||
| // TODO: SAMZA-2369: Add a logging thread which is similar to LoggingPodStatusWatcher in Spark | ||
| public class KubePodStatusWatcher implements Watcher<Pod> { |
There was a problem hiding this comment.
Seems we already have a watcher in job coordinator. Any idea what this thing does? I don't see it's been used.
| /** | ||
| * Convenient utility class with static methods. | ||
| */ | ||
| public class KubeUtils { |
There was a problem hiding this comment.
For each method, you need to put javadocs and add tests.
|
|
||
| package org.apache.samza.job.kubernetes; | ||
|
|
||
| import io.fabric8.kubernetes.api.model.*; |
There was a problem hiding this comment.
Please make sure NOT use wild card imports for all the source files.
|
@xinyuiscool Thanks for the reviewing. I'll update the PR to fix the comments soon. |
|
@weiqingy Is this still being worked on? |
|
Hi @mynameborat Yes, we are actively working on it, targeting Samza 1.5. |
|
@weiqingy Samza 1.5 already released, any update on this release? |
|
@rohitverma02 sorry for the late reply. Will update here about the release plan later. Thanks. |
|
@weiqingy Is this still being worked on? |
|
Hey all, this sounds like a great idea. Imho adding another deployment target (especially one as popular as Kubernetes) would broaden Samza's general adoption appeal. Is this still being targeted for a release, maybe 1.9 at this point? |
|
@weiqingy My team is very interested in this change, as we are looking to move our Samza application to Kubernetes. Is this still being actively worked on? We're interested in helping contribute to this feature, or continuing development on it if this is no longer being worked on. |
|
Hi, Noah,
Happy to learn that you are interested in this feature. Unfortunately, due
to business priority changes, we don't plan to work on that in the near
future. I would encourage your team to pick it up if it fits your interests.
Regards!
…-Yi
On Thu, Feb 23, 2023 at 2:24 PM Noah Stapp ***@***.***> wrote:
@weiqingy <https://github.com/weiqingy> My team is very interested in
this change, as we are looking to move our Samza application to Kubernetes.
Is this still being actively worked on? We're interested in helping
contribute to this feature, or continuing development on it if this is no
longer being worked on.
—
Reply to this email directly, view it on GitHub
<#1197 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACL6SBWFE44RPJ4OW36SMFLWY7PR5ANCNFSM4JCOAWAQ>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
|
Hey @ayakkala1 @hoprocker @NoahStapp Thanks for reviewing and commenting! I haven't got a chance to update PR against the master branch yet, and there are many new changes since the last commit of the PR. Welcome to contribute to this feature and pick it up to catch up your team's timeline. Best regards, |
What changes were proposed in this pull request?
Make Samza jobs able to run on Kubernetes natively.
How was this patch tested?
Run Samza jobs on AKS.